﻿using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Core.Framework.Model.WebSockets
{
    public class WebSocketApplication
    {

        #region 键值队

        /// <summary>
        ///  客户机组
        /// </summary>
        public static ConcurrentDictionary<WebSocket, ClientInfo> ClientsPool =
            new ConcurrentDictionary<WebSocket, ClientInfo>();

        /// <summary>
        /// 客户ID组
        /// projecttoken_clientid
        /// </summary>
        public static ConcurrentDictionary<string, WebSocket> ClientIdsPool =
            new ConcurrentDictionary<string, WebSocket>();

        /// <summary>
        /// 渠道关系组
        /// template_projecttoken_channle
        /// [list clientids]
        /// </summary>
        public static ConcurrentDictionary<string, List<string>> ChannlesPool =
            new ConcurrentDictionary<string, List<string>>();


        /// <summary>
        /// 主题绑定订阅客户机组
        /// </summary>
        public static ConcurrentDictionary<string, List<string>> TempSubscriptions =
            new ConcurrentDictionary<string, List<string>>();


        #endregion


        #region 客户机管理

        /// <summary>
        /// 删除客户机
        /// </summary>
        /// <param name="client"></param>
        public static void ClientLoginOut(WebSocket client,  bool isRetry = true)
        {
            if (ClientsPool.ContainsKey(client))
            {
                var result = ClientsPool.TryRemove(client, out ClientInfo model);

                if (!result & isRetry)
                {
                    ClientLoginOut(client,  false);
                }
                else
                {
                    DictionaryLoginOut(client, model);

                    try
                    {
                        var msgParameter = new { type = "userOutlogin", date = "以下线" };

                        SendAsync(client, msgParameter).AsTask().Wait();

                        client.Dispose();
                    }
                    catch (Exception e)
                    {
                        //throw e;
                    }
                    finally
                    {
                        if (client != null)
                        {
                            client.Dispose();
                        }
                    }
                    
                }

                if (!result)
                {
                    throw new Exception("系统异常请重试");
                }
            }
        }

        /// <summary>
        /// 注册客户机
        /// </summary>
        /// <param name="client"></param>
        /// <param name="clientInfo"></param>
        public static void ClientLogin(WebSocket client, ClientInfo clientInfo, bool isRetry = true)
        {
            // 判断是否存在
            if (!ClientsPool.ContainsKey(client))
            {
                var result = ClientsPool.TryAdd(client, clientInfo);

                if (!result & isRetry)
                {
                    ClientLogin(client, clientInfo, false);
                }

                if (!result)
                {
                    throw new Exception("系统异常请重试");
                }
                else
                {
                    DictionaryLogin(client, clientInfo);
                }

            }
            else
            {
                // 先清除
                ClientLoginOut(client);

                // 在重新登陆
                ClientLogin(client, clientInfo);
            }
        }

        #endregion


        #region 维护渠道

        /// <summary>
        /// 用户登陆
        /// </summary>
        /// <param name="client">ws会话</param>
        /// <param name="clientInfo">客户端信息</param>
        static void DictionaryLogin(WebSocket client, ClientInfo clientInfo)
        {
            // 客户端ID
            var clientId = clientInfo.User.Key;

            // 订阅主题
            var template = clientInfo.Template;

            // 客户ID组键值
            var clientHashName = GetClientHashName(clientInfo);

            // 构造关联 客户端 键值队
            ClientIdsPool.TryAdd(clientHashName, client);

            // 构造 渠道 键值队
            foreach (var channle in clientInfo.User.Subscription)
            {
                // 渠道关系组表名
                var channleHashName = GetChannleHashName(clientInfo, channle);

                // *** 为服务器订阅该客户端 ***
                TempSubscriptions[template].Add(channle);

                // 在该渠道注册此用户
                if (ChannlesPool.ContainsKey(channleHashName))
                    ChannlesPool[channleHashName].Add(clientId);
                else
                    ChannlesPool.TryAdd(channleHashName, new List<string>() { clientId });
            }

        }

        /// <summary>
        /// 用户登出
        /// </summary>
        /// <param name="client">ws会话</param>
        /// <param name="clientInfo">客户端信息</param>
        static void DictionaryLoginOut(WebSocket client, ClientInfo clientInfo)
        {
            // 客户端ID
            var clientId = clientInfo.User.Key;

            // 订阅主题
            var template = clientInfo.Template;

            // 客户ID组键值
            var clientHashName = GetClientHashName(clientInfo);

            // 构造关联 客户端 键值队
            ClientIdsPool.Remove(clientHashName, out client);


            // 构造 渠道 键值队
            foreach (var channle in clientInfo.User.Subscription)
            {
                // *** 从服务器取消该客户端的订阅 ***
                TempSubscriptions[template].Remove(channle);

                // 渠道关系组表名
                var channleHashName = GetChannleHashName(clientInfo, channle);

                // 移除订阅
                ChannlesPool[channleHashName].Remove(clientId);
            }

        }


        /// <summary>
        /// 用户添加订阅
        /// </summary>
        /// <param name="clientInfo">客户机信息</param>
        /// <param name="Channle">订阅渠道</param>
        public static void ClientSubscribe(ClientInfo clientInfo, string Channle)
        {
            // 客户端ID
            var clientId = clientInfo.User.Key;

            // 订阅主题
            var template = clientInfo.Template;

            // 客户ID组键值
            var ccc = GetClientHashName(clientInfo);

            // 查看用户是否在线
            if (ClientIdsPool.ContainsKey(ccc))
            {

                // 获取到会话
                var ws = ClientIdsPool[ccc];

                // 构造订阅会话
                List<string> list = clientInfo.User.Subscription.ToList();
                list.Add(Channle);
                clientInfo.User.Subscription = list.ToArray();

                // 修改用户信息
                ClientsPool[ws].User.Subscription = clientInfo.User.Subscription;


                // 渠道关系组表名
                var channleHashName = GetChannleHashName(clientInfo,Channle);


                // 在该渠道注册此用户
                if (ChannlesPool.ContainsKey(channleHashName))
                    ChannlesPool[channleHashName].Add(clientId);
                else
                    ChannlesPool.TryAdd(channleHashName, new List<string>() { clientId });

                // *** 为服务器订阅该客户端 ***
                TempSubscriptions[template].Add(Channle);

            }
        }



        /// <summary>
        /// 用户取消订阅
        /// </summary>
        /// <param name="clientInfo">客户机信息</param>
        /// <param name="Channle">订阅渠道</param>
        public static void ClientUnSubscribe(ClientInfo clientInfo, string Channle)
        {
            // 客户端ID
            var clientId = clientInfo.User.Key;

            // 订阅主题
            var template = clientInfo.Template;

            // 客户ID组键值
            var ccc = GetClientHashName(clientInfo);

            // 查看用户是否在线
            if (ClientIdsPool.ContainsKey(ccc))
            {

                // 获取到会话
                var ws = ClientIdsPool[ccc];

                // 构造订阅会话
                List<string> list = clientInfo.User.Subscription.ToList();
                list.Remove(Channle);
                clientInfo.User.Subscription = list.ToArray();

                // 修改用户信息
                ClientsPool[ws].User.Subscription = clientInfo.User.Subscription;


                // 渠道关系组表名
                var channleHashName = GetChannleHashName(clientInfo, Channle);


                // 从该渠道注销此用户
                if (ChannlesPool.ContainsKey(channleHashName))
                    ChannlesPool[channleHashName].Remove(clientId);

                // *** 为服务器订阅该客户端 ***
                TempSubscriptions[template].Add(Channle);

            }
        }





        /// <summary>
        /// 修改用户信息
        /// </summary>
        /// <param name="clientInfo">客户机信息</param>
        public static void ClientUpdateParameter(ClientInfo clientInfo)
        {

            // 客户ID组键值
            var ccc = GetClientHashName(clientInfo);

            // 查看用户是否在线
            if (ClientIdsPool.ContainsKey(ccc))
            {

                // 获取到会话
                var ws = ClientIdsPool[ccc];

                // 修改用户信息
                ClientsPool[ws].User.Parameter = clientInfo.User.Parameter;

            }

        }

        #endregion


        #region Common

        /// <summary>
        /// 客户机组 键值
        /// </summary>
        /// <param name="clientInfo"></param>
        /// <param name="Channle"></param>
        /// <returns></returns>
        public static string GetClientHashName(ClientInfo clientInfo)
        {
            // 客户端ID
            var clientId = clientInfo.User.Key;

            // 项目Token
            var projectToken = clientInfo.Project.ProjectToken;

            // 渠道关系组 键值
            return $"{projectToken}_{clientId}";
        }


        /// <summary>
        /// 渠道关系组 键值
        /// </summary>
        /// <param name="clientInfo"></param>
        /// <param name="Channle"></param>
        /// <returns></returns>
        public static string GetChannleHashName(ClientInfo clientInfo, string Channle)
        {

            // 项目Token
            var projectToken = clientInfo.Project.ProjectToken;

            // 订阅主题
            var template = clientInfo.Template.ToLower();

            // 渠道关系组 键值
            return $"{template}_{projectToken}_{Channle}";
        }

        #endregion




        #region 消息处理

        /// <summary>
        ///  发送消息
        /// </summary>
        /// <param name="webSocket"></param>
        /// <param name="msg"></param>
        /// <returns></returns>
        public static async ValueTask SendAsync(WebSocket webSocket, string msg)
        {

            CancellationToken cancellation = default(CancellationToken);

            var buf = Encoding.UTF8.GetBytes(msg);

            var segment = new ArraySegment<byte>(buf);

            await webSocket.SendAsync(segment, WebSocketMessageType.Text, true, cancellation);

        }

        /// <summary>
        ///  发送消息
        /// </summary>
        /// <param name="webSocket"></param>
        /// <param name="msg"></param>
        /// <returns></returns>
        public static async ValueTask SendAsync(WebSocket webSocket, object msg)
        {
            await SendAsync(webSocket, JsonConvert.SerializeObject(msg));
        }

        /// <summary>
        /// 接受消息
        /// </summary>
        /// <param name="webSocket">会话session</param>
        /// <param name="cancellationToken">CancellationToken</param>
        /// <param name="OutLogin">退出登陆</param>
        /// <returns></returns>
        public static async Task RecvAsync(
            WebSocket webSocket, 
            CancellationToken cancellationToken,
            Action<WebSocket> OutLogin)
        {

            using (MemoryStream memory = new MemoryStream())
            {

                var buffer = new ArraySegment<byte>(new byte[1024 * 8]);

                WebSocketReceiveResult result;

                // 接受消息
                do
                {
                    cancellationToken.ThrowIfCancellationRequested(); 

                    result = await webSocket.ReceiveAsync(buffer, cancellationToken);

                    memory.Write(buffer.Array, buffer.Offset, result.Count - buffer.Offset);

                } while (!result.EndOfMessage);

                memory.Seek(0, SeekOrigin.Begin);

                using (StreamReader reader = new StreamReader(memory))
                {

                    // 继续接受消息
                    if (webSocket.State == WebSocketState.Open)
                    {
                        // 处理接收到的消息
                        MessageBranchAction(webSocket, reader.ReadToEndAsync());

                        // 循环接收
                        await RecvAsync(webSocket, cancellationToken, OutLogin);

                    }
                    else
                    {
                        OutLogin(webSocket);
                    }

                }

            }

        }

        /// <summary>
        /// 消息分支动作
        /// </summary>
        /// <param name="webSocket">Session会话</param>
        /// <param name="msg">消息</param>
        public static async void MessageBranchAction(WebSocket webSocket, Task<string> msg)
        {

            string message = msg.Result;

            //await SendAsync(webSocket, new { type = "length", length = message.Length });

            switch (message)
            {
                // 心跳
                case string m when m.StartsWith("ixpe_heartCheck:"):

                    if (ClientsPool.ContainsKey(webSocket))
                        ClientsPool[webSocket].EndHeartTime = DateTime.Now;
                    
                    await SendAsync(webSocket, new { type = "heart", date = m });
                    return;

            }

        }

        #endregion


    }
}
